{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%run startup.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "application/javascript": [
       "$.getScript('./assets/js/ipython_notebook_toc.js')"
      ],
      "text/plain": [
       "<IPython.core.display.Javascript object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "%%javascript\n",
    "$.getScript('./assets/js/ipython_notebook_toc.js')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# A Decision Tree of Observable Operators\n",
    "\n",
    "## Part 6: Looking at Entire Streams\n",
    "\n",
    "> source: http://reactivex.io/documentation/operators.html#tree.  \n",
    "> (transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, [axiros](http://www.axiros.com))  \n",
    "\n",
    "**This tree can help you find the ReactiveX Observable operator you’re looking for.**  \n",
    "See [Part 1](./A Decision Tree of Observable Operators. Part I - Creation.ipynb) for Usage and Output Instructions.  \n",
    "\n",
    "We also require acquaintance with the [marble diagrams](./Marble Diagrams.ipynb) feature of RxPy.\n",
    "\n",
    "<h2 id=\"tocheading\">Table of Contents</h2>\n",
    "<div id=\"toc\"></div>\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to evaluate the entire sequence of items emitted by an Observable"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit a single boolean indicating if all of the items pass some test **[all](http://reactivex.io/documentation/operators/all.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== all ==========\n",
      "\n",
      "module rx.linq.observable.all\n",
      "@extensionmethod(Observable, alias=\"every\")\n",
      "def all(self, predicate):\n",
      "    Determines whether all elements of an observable sequence satisfy a\n",
      "    condition.\n",
      "\n",
      "    1 - res = source.all(lambda value: value.length > 3)\n",
      "\n",
      "    Keyword arguments:\n",
      "    :param bool predicate: A function to test each element for a condition.\n",
      "\n",
      "    :returns: An observable sequence containing a single element determining\n",
      "    whether all elements in the source sequence pass the test in the\n",
      "    specified predicate.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.5     M New subscription on stream 276540965\n",
      "   3.9     M [next]    2.4: True\n",
      "   4.2     M [cmpl]    2.6: fin\n",
      "\n",
      "   4.7     M New subscription on stream 276540989\n",
      "   5.1     M [next]    0.3: False\n",
      "   5.3     M [cmpl]    0.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.all)\n",
    "for i in 9, 10:\n",
    "    d = subs(O.range(10, 20).all(lambda v: v > i))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit a single boolean indicating if the Observable emitted *any* item (that passes some test) **[contains, find_index, some](http://reactivex.io/documentation/operators/container.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== contains ==========\n",
      "\n",
      "module rx.linq.observable.contains\n",
      "@extensionmethod(Observable)\n",
      "def contains(self, value, comparer=None):\n",
      "    Determines whether an observable sequence contains a specified\n",
      "    element with an optional equality comparer.\n",
      "\n",
      "    Example\n",
      "    1 - res = source.contains(42)\n",
      "    2 - res = source.contains({ \"value\": 42 }, lambda x, y: x[\"value\"] == y[\"value\")\n",
      "\n",
      "    Keyword parameters:\n",
      "    value -- The value to locate in the source sequence.\n",
      "    comparer -- {Function} [Optional] An equality comparer to compare elements.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing a single element\n",
      "    determining whether the source sequence contains an element that has\n",
      "    the specified value.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.2     M New subscription on stream 276590957\n",
      "   2.0     M [next]    0.6: True\n",
      "   2.2     M [cmpl]    0.7: fin\n",
      "\n",
      "\n",
      "========== equality operation ==========\n",
      "\n",
      "\n",
      "   3.4     M New subscription on stream 276590973\n",
      "   3.8     M [next]    0.3: True\n",
      "   3.9     M [cmpl]    0.4: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.contains)\n",
    "d = subs(O.range(10, 20).contains(11))\n",
    "header(\"equality operation\")\n",
    "d = subs(O.range(10, 20).contains(11, comparer=lambda x, y: y == x + 1))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== find_index ==========\n",
      "\n",
      "module rx.linq.observable.findindex\n",
      "@extensionmethod(Observable)\n",
      "def find_index(self, predicate):\n",
      "    Searches for an element that matches the conditions defined by the\n",
      "    specified predicate, and returns an Observable sequence with the\n",
      "    zero-based index of the first occurrence within the entire Observable\n",
      "    sequence.\n",
      "\n",
      "    Keyword Arguments:\n",
      "    predicate -- {Function} The predicate that defines the conditions of the\n",
      "        element to search for.\n",
      "\n",
      "    Returns an observable {Observable} sequence with the zero-based index of\n",
      "    the first occurrence of an element that matches the conditions defined\n",
      "    by match, if found; otherwise, -1.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.6     M New subscription on stream 276591037\n",
      "   2.0     M comparer args: 4 [0] <rx.core.anonymousobservable.AnonymousObservable object at 0x107bae610>\n",
      "   2.3     M comparer args: 1 [1] <rx.core.anonymousobservable.AnonymousObservable object at 0x107bae610>\n",
      "   2.5     M comparer args: 2 [2] <rx.core.anonymousobservable.AnonymousObservable object at 0x107bae610>\n",
      "   2.8     M comparer args: 1 [3] <rx.core.anonymousobservable.AnonymousObservable object at 0x107bae610>\n",
      "   3.0     M [next]    1.4: 3\n",
      "   3.1     M [cmpl]    1.5: fin\n",
      "\n",
      "\n",
      "========== some ==========\n",
      "\n",
      "module rx.linq.observable.some\n",
      "@extensionmethod(Observable)\n",
      "def some(self, predicate=None):\n",
      "    Determines whether some element of an observable sequence satisfies a\n",
      "    condition if present, else if some items are in the sequence.\n",
      "\n",
      "    Example:\n",
      "    result = source.some()\n",
      "    result = source.some(lambda x: x > 3)\n",
      "\n",
      "    Keyword arguments:\n",
      "    predicate -- A function to test each element for a condition.\n",
      "\n",
      "    Returns {Observable} an observable sequence containing a single element\n",
      "    determining whether some elements in the source sequence pass the test\n",
      "    in the specified predicate if given, else if some items are in the\n",
      "    sequence.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.6     M New subscription on stream 276563317\n",
      "   2.0     M comparer args: 4\n",
      "   2.3     M comparer args: 1\n",
      "   2.7     M comparer args: 2\n",
      "   2.9     M comparer args: 1\n",
      "   3.0     M [next]    1.3: True\n",
      "   3.2     M [cmpl]    1.5: fin\n"
     ]
    }
   ],
   "source": [
    "def comparer(*a):\n",
    "    ''' find_index: you get value, index and the observable itself as argument\n",
    "        some: you get only the value\n",
    "    '''\n",
    "    log('comparer args:', *a)\n",
    "    l.append(1)\n",
    "    if len(l) > 3:\n",
    "        # => index will be 0\n",
    "        return True\n",
    "    \n",
    "stream = O.from_((4, 1, 2, 1, 3))\n",
    "for name in 'find_index', 'some':    \n",
    "    l = []\n",
    "    operator = getattr(stream, name)\n",
    "    rst(operator) # output documentation, reset timer\n",
    "    d = subs(operator(lambda *a: comparer(*a)))\n",
    "\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit a single boolean indicating if the Observable emitted no items **[is_empty](http://reactivex.io/documentation/operators/contains.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== is_empty ==========\n",
      "\n",
      "module rx.linq.observable.isempty\n",
      "@extensionmethod(Observable)\n",
      "def is_empty(self):\n",
      "    Determines whether an observable sequence is empty.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing a single element\n",
      "    determining whether the source sequence is empty.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.9     M New subscription on stream 276596853\n",
      "   2.4     M [next]    0.4: True\n",
      "   2.7     M [cmpl]    0.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.is_empty)\n",
    "d = subs(O.from_([]).is_empty())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit a single boolean indicating if the sequence is identical to one emitted by a second Observable **[sequence_equal](http://reactivex.io/documentation/operators/sequenceequal.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 75,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== sequence_equal ==========\n",
      "\n",
      "module rx.linq.observable.sequenceequal\n",
      "@extensionmethod(Observable)\n",
      "def sequence_equal(self, second, comparer=None):\n",
      "    Determines whether two sequences are equal by comparing the\n",
      "    elements pairwise using a specified equality comparer.\n",
      "\n",
      "    1 - res = source.sequence_equal([1,2,3])\n",
      "    2 - res = source.sequence_equal([{ \"value\": 42 }], lambda x, y: x.value == y.value)\n",
      "    3 - res = source.sequence_equal(Observable.return_value(42))\n",
      "    4 - res = source.sequence_equal(Observable.return_value({ \"value\": 42 }), lambda x, y: x.value == y.value)\n",
      "\n",
      "    second -- Second observable sequence or array to compare.\n",
      "    comparer -- [Optional] Comparer used to compare elements of both sequences.\n",
      "\n",
      "    Returns an observable sequence that contains a single element which\n",
      "    indicates whether both sequences are of equal length and their\n",
      "    corresponding elements are equal according to the specified equality\n",
      "    comparer.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   4.8     M New subscription on stream 279265221\n",
      "   5.3     M got r R\n",
      "   5.6     M got x X\n",
      "   6.4     M got p P\n",
      "   6.8     M got y Y\n",
      "   7.0     M got    \n",
      "   7.4     M got r R\n",
      "   7.6     M got o O\n",
      "   8.0     M got c C\n",
      "   8.5     M got k K\n",
      "   8.9     M got s S\n",
      "   9.4     M [next]    4.6: True\n",
      "   9.6     M [cmpl]    4.8: fin\n",
      "\n",
      "\n",
      "========== there is no order guarantee of arguments in the comparer: ==========\n",
      "\n",
      "\n",
      "  11.4     M New subscription on stream 276578009\n",
      "  27.6     M got a A\n",
      " 139.1     M got b B\n",
      " 749.0  T988 got C c\n",
      " 749.6  T988 [next]  738.1: False\n",
      " 749.7  T988 [cmpl]  738.2: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.sequence_equal)\n",
    "def f(x, y):\n",
    "    log('got', x, y)\n",
    "    return str(y) == str(x).upper()\n",
    "d = subs(O.from_('rxpy rocks').sequence_equal(\n",
    "         O.from_('RXPY ROCKS'), lambda x, y: f(x, y)))\n",
    "\n",
    "header(\"there is no order guarantee of arguments in the comparer:\")\n",
    "d = subs(marble_stream('a-b------c-d|').sequence_equal(\n",
    "         marble_stream('A-B----C-D|'), lambda x, y: f(x, y)))\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit the average of all of their values **[average](http://reactivex.io/documentation/operators/average.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 83,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== average ==========\n",
      "\n",
      "module rx.linq.observable.average\n",
      "@extensionmethod(Observable)\n",
      "def average(self, key_selector=None):\n",
      "    Computes the average of an observable sequence of values that are in\n",
      "    the sequence or obtained by invoking a transform function on each\n",
      "    element of the input sequence if present.\n",
      "\n",
      "    Example\n",
      "    res = source.average();\n",
      "    res = source.average(lambda x: x.value)\n",
      "\n",
      "    :param Observable self: Observable to average.\n",
      "    :param types.FunctionType key_selector: A transform function to apply to\n",
      "        each element.\n",
      "\n",
      "    :returns: An observable sequence containing a single element with the\n",
      "        average of the sequence of values.\n",
      "    :rtype: Observable\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   2.5     M New subscription on stream 279255829\n",
      "   4.0     M [next]    1.4: 5.0\n",
      "   4.1     M [cmpl]    1.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.average)\n",
    "d = subs(O.from_('1199').average(lambda x: int(x)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit the sum of all of their values **[sum](http://reactivex.io/documentation/operators/sum.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== sum ==========\n",
      "\n",
      "module rx.linq.observable.sum\n",
      "@extensionmethod(Observable)\n",
      "def sum(self, key_selector=None):\n",
      "    Computes the sum of a sequence of values that are obtained by\n",
      "    invoking an optional transform function on each element of the input\n",
      "    sequence, else if not specified computes the sum on each item in the\n",
      "    sequence.\n",
      "\n",
      "    Example\n",
      "    res = source.sum()\n",
      "    res = source.sum(lambda x: x.value)\n",
      "\n",
      "    key_selector -- {Function} [Optional] A transform function to apply to\n",
      "        each element.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing a single element\n",
      "    with the sum of the values in the source sequence.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   2.6     M New subscription on stream 276565865\n",
      "   3.9     M [next]    1.2: 20\n",
      "   4.1     M [cmpl]    1.4: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.sum)\n",
    "d = subs(O.from_('1199').sum(lambda x: int(x)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit a number indicating how many items were in the sequence **[count](http://reactivex.io/documentation/operators/count.html)**\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 88,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== count ==========\n",
      "\n",
      "module rx.linq.observable.count\n",
      "@extensionmethod(Observable)\n",
      "def count(self, predicate=None):\n",
      "    Returns an observable sequence containing a value that represents\n",
      "    how many elements in the specified observable sequence satisfy a\n",
      "    condition if provided, else the count of items.\n",
      "\n",
      "    1 - res = source.count()\n",
      "    2 - res = source.count(lambda x: x > 3)\n",
      "\n",
      "    Keyword arguments:\n",
      "    :param types.FunctionType predicate: A function to test each element for a\n",
      "        condition.\n",
      "\n",
      "    :returns: An observable sequence containing a single element with a\n",
      "    number that represents how many elements in the input sequence satisfy\n",
      "    the condition in the predicate function if provided, else the count of\n",
      "    items in the sequence.\n",
      "    :rtype: Observable\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.8     M New subscription on stream 279255949\n",
      "   3.6     M [next]    1.6: 2\n",
      "   3.7     M [cmpl]    1.8: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.count)\n",
    "d = subs(O.from_('1199').count(lambda x: int(x) > 2))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ...and emit the item with the maximum value **[max, max_by](http://reactivex.io/documentation/operators/max.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 91,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== max ==========\n",
      "\n",
      "module rx.linq.observable.max\n",
      "@extensionmethod(Observable)\n",
      "def max(self, comparer=None):\n",
      "    Returns the maximum value in an observable sequence according to the\n",
      "    specified comparer.\n",
      "\n",
      "    Example\n",
      "    res = source.max()\n",
      "    res = source.max(lambda x, y:  x.value - y.value)\n",
      "\n",
      "    Keyword arguments:\n",
      "    comparer -- {Function} [Optional] Comparer used to compare elements.\n",
      "\n",
      "    Returns {Observable} An observable sequence containing a single element\n",
      "    with the maximum element in the source sequence.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.5     M New subscription on stream 279255829\n",
      "   3.0     M [next]    1.3: 1\n",
      "   3.1     M [cmpl]    1.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.max)\n",
    "d = subs(O.from_('1199').max(lambda x, y: int(x) + int(y) < 5))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 97,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== max_by ==========\n",
      "\n",
      "module rx.linq.observable.maxby\n",
      "@extensionmethod(Observable)\n",
      "def max_by(self, key_selector, comparer=None):\n",
      "    Returns the elements in an observable sequence with the maximum\n",
      "    key value according to the specified comparer.\n",
      "\n",
      "    Example\n",
      "    res = source.max_by(lambda x: x.value)\n",
      "    res = source.max_by(lambda x: x.value, lambda x, y: x - y)\n",
      "\n",
      "    Keyword arguments:\n",
      "    key_selector -- {Function} Key selector function.\n",
      "    comparer -- {Function} [Optional] Comparer used to compare key values.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing a list of zero\n",
      "    or more elements that have a maximum key value.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.5     M New subscription on stream 276577929\n",
      "   2.8     M [next]    1.1: ['1', '2', '1', '2', '4']\n",
      "   3.3     M [cmpl]    1.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.max_by)\n",
    "d = subs(O.from_('1271246').max_by(lambda x: int(x)  < 5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and emit the item with the minimum value **[min, min_by](http://reactivex.io/documentation/operators/min.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 99,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== min ==========\n",
      "\n",
      "module rx.linq.observable.min\n",
      "@extensionmethod(Observable)\n",
      "def min(self, comparer=None):\n",
      "    Returns the minimum element in an observable sequence according to\n",
      "    the optional comparer else a default greater than less than check.\n",
      "\n",
      "    Example\n",
      "    res = source.min()\n",
      "    res = source.min(lambda x, y: x.value - y.value)\n",
      "\n",
      "    comparer -- {Function} [Optional] Comparer used to compare elements.\n",
      "\n",
      "    Returns an observable sequence {Observable} containing a single element\n",
      "    with the minimum element in the source sequence.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.5     M New subscription on stream 279257813\n",
      "   2.7     M [next]    1.0: 1\n",
      "   2.8     M [cmpl]    1.2: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.min)\n",
    "d = subs(O.from_('1199').min(lambda x, y: int(x) + int(y) > 5))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 100,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== min_by ==========\n",
      "\n",
      "module rx.linq.observable.minby\n",
      "@extensionmethod(Observable)\n",
      "def min_by(self, key_selector, comparer=None):\n",
      "    Returns the elements in an observable sequence with the minimum key\n",
      "    value according to the specified comparer.\n",
      "\n",
      "    Example\n",
      "    res = source.min_by(lambda x: x.value)\n",
      "    res = source.min_by(lambda x: x.value, lambda x, y: x - y)\n",
      "\n",
      "    Keyword arguments:\n",
      "    key_selector -- {Function} Key selector function.\n",
      "    comparer -- {Function} [Optional] Comparer used to compare key values.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing a list of zero\n",
      "    or more elements that have a minimum key value.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.7     M New subscription on stream 276595497\n",
      "   3.1     M [next]    1.3: ['7', '6']\n",
      "   3.3     M [cmpl]    1.6: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.min_by)\n",
    "d = subs(O.from_('1271246').min_by(lambda x: int(x)  < 5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... by applying an aggregation function to each item in turn (or recursing into its result) and emitting the result **[scan, expand](http://reactivex.io/documentation/operators/scan.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 110,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== scan ==========\n",
      "\n",
      "module rx.linq.observable.scan\n",
      "@extensionmethod(Observable)\n",
      "def scan(self, accumulator, seed=None):\n",
      "    Applies an accumulator function over an observable sequence and\n",
      "    returns each intermediate result. The optional seed value is used as\n",
      "    the initial accumulator value. For aggregation behavior with no\n",
      "    intermediate results, see Observable.aggregate.\n",
      "\n",
      "    1 - scanned = source.scan(lambda acc, x: acc + x)\n",
      "    2 - scanned = source.scan(lambda acc, x: acc + x, 0)\n",
      "\n",
      "    Keyword arguments:\n",
      "    accumulator -- An accumulator function to be invoked on each element.\n",
      "    seed -- [Optional] The initial accumulator value.\n",
      "\n",
      "    Returns an observable sequence containing the accumulated values.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   2.4     M New subscription on stream 278466789\n",
      "   3.0     M [next]    0.4: ['1', 1]\n",
      "   3.2     M [next]    0.7: ['2', 3]\n",
      "   3.5     M [next]    0.9: ['3', 6]\n",
      "   3.7     M [next]    1.2: ['4', 10]\n",
      "   4.0     M [next]    1.4: ['5', 15]\n",
      "   4.2     M [cmpl]    1.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.scan)\n",
    "# printing original value next to the aggregate:\n",
    "d = subs(O.from_('12345').scan(\n",
    "        lambda x, y: [y, int(x[1]) + int(y)],\n",
    "        seed=[0, 0]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 115,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== expand ==========\n",
      "\n",
      "module rx.linq.observable.expand\n",
      "@extensionmethod(Observable)\n",
      "def expand(self, selector, scheduler=None):\n",
      "    Expands an observable sequence by recursively invoking selector.\n",
      "\n",
      "    selector -- {Function} Selector function to invoke for each produced\n",
      "        element, resulting in another sequence to which the selector will be\n",
      "        invoked recursively again.\n",
      "    scheduler -- {Scheduler} [Optional] Scheduler on which to perform the\n",
      "        expansion. If not provided, this defaults to the current thread\n",
      "        scheduler.\n",
      "\n",
      "    Returns an observable {Observable} sequence containing all the elements\n",
      "    produced by the recursive expansion.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   4.0     M New subscription on stream 278470673\n",
      "   4.6     M [next]    0.5: 42\n",
      "   5.3     M [next]    1.2: 84\n",
      "   5.7     M [next]    1.6: 126\n",
      "   6.3     M [next]    2.1: 168\n",
      "   6.5     M [next]    2.4: 210\n",
      "   6.6     M [cmpl]    2.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.expand)\n",
    "# printing original value next to the aggregate:\n",
    "d = subs(O.just(42).expand(lambda x: O.just(42 + x)).take(5))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
